Skip to content

FLAME end-to-end: partitioner fix, execution metadata, setup callback#37

Merged
cigrainger merged 10 commits into
mainfrom
fix/partitioner-adbc-columns
Mar 25, 2026
Merged

FLAME end-to-end: partitioner fix, execution metadata, setup callback#37
cigrainger merged 10 commits into
mainfrom
fix/partitioner-adbc-columns

Conversation

@cigrainger

@cigrainger cigrainger commented Mar 25, 2026

Copy link
Copy Markdown
Contributor

Summary

End-to-end FLAME distributed queries now work from Livebook on Fly.io. This PR bundles all fixes discovered during testing:

Partitioner fixes

  • Flatten ADBC column batches (materialized.data is [[%Adbc.Column{}]], not [%Adbc.Column{}])
  • Use Adbc.Column.to_list/1 instead of Enum.to_list/1 for Arrow columns

FLAME improvements

  • Concurrent spin_up/2 via Task.async for parallel runner boot
  • :setup callback option — runs on each worker after boot (e.g. S3 secrets, extensions)
  • Fixed status/1 to accept workers list directly (remote :pg unreliable across FLAME nodes)
  • Removed broken await_pg_registration (was using wrong :pg scope)

Execution metadata

  • Added meta field to %Dux{} struct
  • Coordinator populates meta with: n_workers, n_nodes, nodes, merge_strategy, total_duration_ms
  • kino_dux reads meta for rich distributed stats rendering

Documentation

  • max_concurrency: 1 in all FLAME pool examples (FLAME default is 100 — all workers land on same machine)
  • LIVEBOOK_COOKIE via Node.get_cookie() not System.get_env()
  • boot_timeout: 120_000 for Fly cold starts

Usage

workers = Dux.Flame.spin_up(3, pool: :dux_pool, setup: fn ->
  Dux.create_secret(:s3, type: :s3, region: "us-west-2")
end)

Dux.from_parquet("s3://ookla-open-data/parquet/performance/type=mobile/year=2023/quarter=*/*.parquet")
|> Dux.distribute(workers)
|> Dux.group_by(:quarter)
|> Dux.summarise(avg_download: avg(avg_d_kbps))
|> Dux.compute()
# => %Dux{meta: %{distributed: true, n_nodes: 3, ...}}

Dux.Flame.status(workers)
# => %{total_workers: 3, nodes: %{:"flame-abc@..." => 1, ...}}

Test plan

  • 37 distributed + streaming tests pass
  • Credo strict clean
  • End-to-end tested on Fly: Livebook → FLAME pool → 3 nodes → Ookla S3 → distributed compute → result

🤖 Generated with Claude Code

cigrainger and others added 9 commits March 25, 2026 13:46
materialized.data returns [[%Adbc.Column{}]] (list of batches),
not [%Adbc.Column{}]. find_column was iterating the outer list,
getting a batch (list) instead of individual columns, then crashing
on col.field.name.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adbc.Column structs don't implement Enumerable. Need to use
Adbc.Column.to_list/1 to extract values from Arrow columns.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sequential place_child calls all landed on the same runner since
there was no backpressure. Concurrent Task.async placement forces
the pool to boot new runners when max_concurrency is 1.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Concurrent placement caused GenServer call timeouts when multiple
runners booted simultaneously. Sequential is correct — with
max_concurrency: 1, each placed child holds its slot permanently,
guaranteeing the next place_child goes to a new runner.

Added :timeout option (default 120s) to handle slow Fly machine boots.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The pool needs time to process the slot replacement message
(caller PID → child PID) before the next checkout attempt.
Without this, sequential place_child calls can all see the
runner at count 0 and land on the same runner.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The root cause of all workers landing on the same runner was FLAME's
default max_concurrency of 100. DuckDB saturates cores internally,
so one worker per machine is optimal.

- Add max_concurrency: 1 to all FLAME pool examples
- Fix LIVEBOOK_COOKIE: use Node.get_cookie() not System.get_env()
- Add boot_timeout: 120_000 for Fly cold starts
- Remove debug logging from spin_up
- Document why max_concurrency: 1 matters

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- spin_up uses concurrent Task.async for parallel runner boot
- status accepts workers list directly (remote PIDs can't use :pg)
- Remove await_pg_registration (was using wrong :pg scope)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Execution metadata:
- Add `meta` field to %Dux{} struct
- Coordinator populates meta with distributed execution stats:
  n_workers, n_nodes, nodes, merge_strategy, total_duration_ms
- kino_dux can read meta for rich rendering

Setup callback:
- Worker.setup/3 runs a function on the worker's node
- spin_up/2 accepts :setup option, runs on all workers after boot
- Enables per-worker S3 secrets, extension loading, etc.

Concurrent spin_up:
- Uses Task.async for parallel runner boot
- status/1 accepts workers list directly (remote :pg unreliable)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@cigrainger cigrainger changed the title Fix partitioner ADBC column batch flattening FLAME end-to-end: partitioner fix, execution metadata, setup callback Mar 25, 2026
Concurrent Task.async + place_child causes internal FLAME GenServer
timeouts ({:placed_child, ...} 5s default). Sequential placement
works correctly — with max_concurrency: 1, each placement still
boots a separate runner.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@cigrainger cigrainger merged commit 210ed34 into main Mar 25, 2026
5 checks passed
@cigrainger cigrainger deleted the fix/partitioner-adbc-columns branch March 25, 2026 05:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant